#include "s3select.h"
#include <fstream>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <boost/crc.hpp>
#include <arpa/inet.h>

using namespace s3selectEngine;
using namespace BOOST_SPIRIT_CLASSIC_NS;

class awsCli_handler {


//TODO get parameter 
private:
  std::unique_ptr<s3selectEngine::s3select> s3select_syntax;
  std::string m_s3select_query;
  std::string m_result;
  std::unique_ptr<s3selectEngine::csv_object> m_s3_csv_object;
  std::string m_column_delimiter;//TODO remove
  std::string m_quot;//TODO remove
  std::string m_row_delimiter;//TODO remove
  std::string m_compression_type;//TODO remove
  std::string m_escape_char;//TODO remove
  std::unique_ptr<char[]>  m_buff_header;
  std::string m_header_info;
  std::string m_sql_query;
  uint64_t m_total_object_processing_size;

public:

  awsCli_handler():
      s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
      m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
      m_buff_header(std::make_unique<char[]>(1000)),
      m_total_object_processing_size(0),
      crc32(std::unique_ptr<boost::crc_32_type>())
  {
  }

  enum header_name_En
  {
    EVENT_TYPE,
    CONTENT_TYPE,
    MESSAGE_TYPE
  };
  static const char* header_name_str[3];

  enum header_value_En
  {
    RECORDS,
    OCTET_STREAM,
    EVENT,
    CONT
  };
  static const char* header_value_str[4];

private:

    void encode_short(char *buff, uint16_t s, int &i)
    {
      short x = htons(s);
      memcpy(buff, &x, sizeof(s));
      i += sizeof(s);
    }

    void encode_int(char *buff, u_int32_t s, int &i)
    {
      u_int32_t x = htonl(s);
      memcpy(buff, &x, sizeof(s));
      i += sizeof(s);
    }

  int create_header_records(char* buff)
  {
  int i = 0;

  //1
  buff[i++] = char(strlen(header_name_str[EVENT_TYPE]));
  memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE]));
  i += strlen(header_name_str[EVENT_TYPE]);
  buff[i++] = char(7);
  encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i);
  memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS]));
  i += strlen(header_value_str[RECORDS]);

  //2
  buff[i++] = char(strlen(header_name_str[CONTENT_TYPE]));
  memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE]));
  i += strlen(header_name_str[CONTENT_TYPE]);
  buff[i++] = char(7);
  encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i);
  memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM]));
  i += strlen(header_value_str[OCTET_STREAM]);

  //3
  buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE]));
  memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE]));
  i += strlen(header_name_str[MESSAGE_TYPE]);
  buff[i++] = char(7);
  encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i);
  memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT]));
  i += strlen(header_value_str[EVENT]);

  return i;
}

  std::unique_ptr<boost::crc_32_type> crc32;

  int create_message(std::string &out_string, u_int32_t result_len, u_int32_t header_len)
  {
    u_int32_t total_byte_len = 0;
    u_int32_t preload_crc = 0;
    u_int32_t message_crc = 0;
    int i = 0;
    char *buff = out_string.data();

    if (crc32 == 0)
    {
      // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
      crc32 = std::unique_ptr<boost::crc_32_type>(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>);
    }

    total_byte_len = result_len + 16;

    encode_int(&buff[i], total_byte_len, i);
    encode_int(&buff[i], header_len, i);

    crc32->reset();
    *crc32 = std::for_each(buff, buff + 8, *crc32);
    preload_crc = (*crc32)();
    encode_int(&buff[i], preload_crc, i);

    i += result_len;

    crc32->reset();
    *crc32 = std::for_each(buff, buff + i, *crc32);
    message_crc = (*crc32)();

    int out_encode;
    encode_int(reinterpret_cast<char*>(&out_encode), message_crc, i);
    out_string.append(reinterpret_cast<char*>(&out_encode),sizeof(out_encode));

    return i;
  }

#define PAYLOAD_LINE "\n<Payload>\n<Records>\n<Payload>\n"
#define END_PAYLOAD_LINE "\n</Payload></Records></Payload>"

public:

  //std::string get_error_description(){}

  std::string get_result()
  {
    return m_result;
  }

  int run_s3select(const char *query, const char *input, size_t input_length, size_t object_size)
  {
    int status = 0;
    csv_object::csv_defintions csv;

    m_result = "012345678901"; //12 positions for header-crc

    int header_size = 0;

    if (m_s3_csv_object == 0)
    {
      s3select_syntax->parse_query(query);

      if (m_row_delimiter.size())
      {
        csv.row_delimiter = *m_row_delimiter.c_str();
      }

      if (m_column_delimiter.size())
      {
        csv.column_delimiter = *m_column_delimiter.c_str();
      }

      if (m_quot.size())
      {
        csv.quot_char = *m_quot.c_str();
      }

      if (m_escape_char.size())
      {
        csv.escape_char = *m_escape_char.c_str();
      }

      if (m_header_info.compare("IGNORE") == 0)
      {
        csv.ignore_header_info = true;
      }
      else if (m_header_info.compare("USE") == 0)
      {
        csv.use_header_info = true;
      }

      m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv));
    }

    if (s3select_syntax->get_error_description().empty() == false)
    {
      header_size = create_header_records(m_buff_header.get());
      m_result.append(m_buff_header.get(), header_size);
      m_result.append(PAYLOAD_LINE);
      m_result.append(s3select_syntax->get_error_description());
      //ldout(s->cct, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl;
      status = -1;
    }
    else
    {
      header_size = create_header_records(m_buff_header.get());
      m_result.append(m_buff_header.get(), header_size);
      m_result.append(PAYLOAD_LINE);
      //status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size);
      status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, object_size);
      if (status < 0)
      {
        m_result.append(m_s3_csv_object->get_error_description());
      }
    }

    if (m_result.size() > strlen(PAYLOAD_LINE))
    {
      m_result.append(END_PAYLOAD_LINE);
      create_message(m_result, m_result.size() - 12, header_size);
      //s->formatter->write_bin_data(m_result.data(), buff_len);
      //if (op_ret < 0)
      //{
      //  return op_ret;
      //}
    }
    //rgw_flush_formatter_and_reset(s, s->formatter);

    return status;
  }
  //int extract_by_tag(std::string tag_name, std::string& result);

  //void convert_escape_seq(std::string& esc);

  //int handle_aws_cli_parameters(std::string& sql_query);

};

const char* awsCli_handler::header_name_str[3] = {":event-type", ":content-type", ":message-type"};
const char* awsCli_handler::header_value_str[4] = {"Records", "application/octet-stream", "event","cont"};
int run_on_localFile(char*  input_query);

int main(int argc,char **argv)
{
awsCli_handler awscli;

  char *query;
  char *fname;

  bool using_key_flag = false;

  for (int i = 0; i < argc; i++)
  {

    if (!strcmp(argv[i], "-key"))
    {
      fname = argv[i + 1];
      using_key_flag = true;
      continue;
    }

    if (!strcmp(argv[i], "-q"))
    {
      query = argv[i + 1];
      continue;
    }
  }

  if(using_key_flag == false)
  {
    return run_on_localFile(query);
  }

  FILE* fp  = fopen(fname, "r");
  
  if(!fp)
  {
    std::cout << " input stream is not valid, abort;" << std::endl;
    return -1;
  }

  struct stat statbuf;
  lstat(fname, &statbuf);
  int status;

  s3selectEngine::csv_object::csv_defintions csv;
  csv.use_header_info = false;

#define BUFFER_SIZE 1024  * 4 //simulate 4mb parts in s3-objects
  char *buff = (char *)malloc(BUFFER_SIZE);
  while (1)
  {
    size_t input_sz = fread(buff, 1, BUFFER_SIZE, fp);
    status = awscli.run_s3select(query, buff, input_sz, statbuf.st_size);
    if(status<0)
    {
      std::cout << "failure on execution " << std::endl;
      break;
    }
    std::cout << awscli.get_result() << std::endl;

    if(!input_sz || feof(fp))
    {
      break;
    }
  }

  free(buff);
  fclose(fp);
  return status;
}

int run_on_localFile(char*  input_query)
{

  //purpose: demostrate the s3select functionalities
  s3select s3select_syntax;

  if (!input_query)
  {
    std::cout << "type -q 'select ... from ...  '" << std::endl;
    return -1;
  }

  int status = s3select_syntax.parse_query(input_query);
  if (status != 0)
  {
    std::cout << "failed to parse query " << s3select_syntax.get_error_description() << std::endl;
    return -1;
  }

  std::string object_name = s3select_syntax.get_from_clause(); //TODO stdin

  FILE* fp;

  if (object_name.compare("stdin")==0)
  {
    fp = stdin;
  }
  else
  {
    fp  = fopen(object_name.c_str(), "r");
  }


  if(!fp)
  {
    std::cout << " input stream is not valid, abort;" << std::endl;
    return -1;
  }

  struct stat statbuf;

  lstat(object_name.c_str(), &statbuf);

  std::string s3select_result;
  s3selectEngine::csv_object::csv_defintions csv;
  csv.use_header_info = false;
  bool do_aggregate = false;
  //csv.column_delimiter='|';
  //csv.row_delimiter='\t';


  s3selectEngine::csv_object  s3_csv_object(&s3select_syntax, csv);

#define BUFF_SIZE 1024*1024*4 //simulate 4mb parts in s3 object
  char* buff = (char*)malloc( BUFF_SIZE );
  while(1)
  {
    //char buff[4096];

    //char * in = fgets(buff,sizeof(buff),fp);
    size_t input_sz = fread(buff, 1, BUFF_SIZE, fp);
    char* in=buff;
    //input_sz = strlen(buff);
    //size_t input_sz = in == 0 ? 0 : strlen(in);

    if (!input_sz || feof(fp)) 
    {
        do_aggregate = true;
    }

    int status;
    if(do_aggregate == true)
    {
      status = s3_csv_object.run_s3select_on_object(s3select_result, in, input_sz, false, false, do_aggregate);
    }
    else
    {
      status = s3_csv_object.run_s3select_on_stream(s3select_result, in, input_sz, statbuf.st_size);
    }

    if(status<0)
    {
      std::cout << "failure on execution " << std::endl << s3_csv_object.get_error_description() <<  std::endl;
      break;
    }

    if(s3select_result.size()>1)
    {
      std::cout << s3select_result;
    }

    s3select_result = "";
    if(!input_sz || feof(fp))
    {
      break;
    }

  }

  free(buff);
  fclose(fp);

  return 0;
}
